import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../common/test_page.dart';
import '../../utils.dart';

Stream<int> _stream() =>
    Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1).take(10);

class ThrottleTestPage extends TestPage {
  ThrottleTestPage(super.title) {
    test('Rx.throttle', () async {
      expect(
          _stream().throttle(
                  (_) => Stream<void>.periodic(const Duration(milliseconds: 250)))
              .take(3),
      );
    });

    test('Rx.throttle.trailing', () async {
      expect(
          _stream()
              .throttle(
                  (_) => Stream<void>.periodic(const Duration(milliseconds: 250)),
              trailing: true,
              leading: false)
              .take(3),
      );
    });

    test('Rx.throttle.dynamic.window', () async {
      expect(
          _stream()
              .throttle((value) => value == 1
              ? Stream<void>.periodic(const Duration(milliseconds: 10))
              : Stream<void>.periodic(const Duration(milliseconds: 250)))
              .take(3),
      );
    });

    test('Rx.throttle.dynamic.window.trailing', () async {
      expect(
          _stream()
              .throttle(
                  (value) => value == 1
                  ? Stream<void>.periodic(const Duration(milliseconds: 10))
                  : Stream<void>.periodic(const Duration(milliseconds: 250)),
              trailing: true,
              leading: false)
              .take(3),
      );
    });

    test('Rx.throttle.leading.trailing.1', () async {
      final values = <int>[];

      final stream = _stream()
          .concatWith([Rx.timer(11, const Duration(milliseconds: 100))]).throttle(
            (v) {
          values.add(v);
          return Stream<void>.periodic(const Duration(milliseconds: 250));
        },
        leading: true,
        trailing: true,
      );
      expect(
        stream,
      );
      expect(values);
    });

    test('Rx.throttle.leading.trailing.2', () async {
      final values = <int>[];

      final stream = _stream().throttle(
            (v) {
          values.add(v);
          return Stream<void>.periodic(const Duration(milliseconds: 250));
        },
        leading: true,
        trailing: true,
      );
      expect(
        stream,
      );
      expect(values);
    });

    test('Rx.throttle.reusable', () async {
      final transformer = ThrottleStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 250)));

      expect(_stream().transform(transformer).take(2));

      expect(_stream().transform(transformer).take(2));
    });

    test('Rx.throttle.asBroadcastStream', () async {
      final future = _stream()
          .asBroadcastStream()
          .throttle(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 250)))
          .drain<void>();

      expect(future);
      expect(future);
    });

    test('Rx.throttle.error.shouldThrowA', () async {
      final streamWithError = Stream<void>.error(Exception()).throttle(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 250)));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.throttle.pause.resume', () async {
      late StreamSubscription<int> subscription;

      final controller = StreamController<int>();

      subscription = _stream()
          .throttle(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 250)))
          .take(2)
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      expect(controller.stream);

      Future<void>.delayed(const Duration(milliseconds: 150)).whenComplete(
              () => subscription
              .pause(Future<void>.delayed(const Duration(milliseconds: 150))));
    });

    test('Rx.throttle.nullable', () {
      nullableTest<String?>(
            (s) => s.throttle((_) => Stream<void>.empty()),
      );
    });

    test('Rx.throttleTime', () async {
      expect(
          _stream().throttleTime(const Duration(milliseconds: 250)).take(3),
      );
    });

    test('Rx.throttleTime.trailing', () async {
      expect(
          _stream()
              .throttleTime(const Duration(milliseconds: 250),
              trailing: true, leading: false)
              .take(3),
      );
    });

    test('Rx.throttleTime.reusable', () async {
      final transformer = ThrottleStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 250)));

      expect(_stream().transform(transformer).take(2));

      expect(_stream().transform(transformer).take(2));
    });

    test('Rx.throttleTime.asBroadcastStream', () async {
      final future = _stream()
          .asBroadcastStream()
          .throttleTime(const Duration(milliseconds: 250))
          .drain<void>();

      expect(future);
      expect(future);
    });

    test('Rx.throttleTime.error.shouldThrowA', () async {
      final streamWithError = Stream<void>.error(Exception())
          .throttleTime(const Duration(milliseconds: 200));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.throttleTime.pause.resume', () async {
      late StreamSubscription<int> subscription;

      final controller = StreamController<int>();

      subscription = _stream()
          .throttleTime(const Duration(milliseconds: 250))
          .take(2)
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      expect(
          controller.stream);

      Future<void>.delayed(const Duration(milliseconds: 150)).whenComplete(
              () => subscription
              .pause(Future<void>.delayed(const Duration(milliseconds: 150))));
    });

    test('issue/417 trailing true', () async {
      expect(
          Stream.fromIterable([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
              .interval(Duration(milliseconds: 25))
              .throttleTime(Duration(milliseconds: 50),
              trailing: true, leading: false),
      );
    });

    test('issue/417 trailing false', () async {
      expect(
          Stream.fromIterable([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
              .interval(Duration(milliseconds: 25))
              .throttleTime(Duration(milliseconds: 50), trailing: false),
      );
    });

    test('Rx.throttleTime.nullable', () {
      nullableTest<String?>(
            (s) => s.throttleTime(Duration.zero),
      );
    });

  }

}